Gans (e-scooters renting company) has seen that its operational success depends on having a good prediction of their e-scooters parked where users need them. This project task will be to collect data from external sources that can potentially help Gans predict e-scooter movement by creating the all automated system in the cloud.
from IPython.display import Image
Image(filename='scoothers.png',width=800, height=400)
Ideally, scooters get rearranged organically by having certain users moving from point A to point B, and then an even number of users moving from point B to point A. There are some elements that create asymmetries. Here are some of them:
There are some actions that the company can perform to solve these asymmetries, namely:
Since data is needed every day, in real time and accessible by everyone in the company, the challenge is going to be to assemble and automate a data pipeline in the cloud. to achieve this task was required the following:
1) Python scripts for data collection from different APIs sources 2) Create a relational data base (DB) from scratch using MySQL into AWS Cloud Compute Service (RDS) 3) Automate the data collection to the DB using the AWS Lambda funtions
from IPython.display import Image
Image(filename='data_pipeline.png',width=800, height=400)
NOTE: Due to the limitation of the Free of cost API source, for this project was used a sample of 15 cities to collect the geographycal, airports and weather information and just 1 airport to collect the flights data
CSV example:
csv
name,country
London,United Kingdom
Berlin,Germany
Madrid,Spain
Paris,France
Bucharest,Romania
Budapest,Hungary
Hamburg,Germany
Warsaw,Poland
Vienna,Austria
Barcelona,Spain
Stockholm,Sweden
Belgrade,Serbia
Munich,Germany
Rome,Italy
Porto,Portugal
The config.py file will hold all your secret information like: API Keys, API user names and the Data Base access parameters, asigned into diferent variables to be call in the main script
The following code was used to collect the geographical data and generate the csv file that will be use for filling the cities table in the Data Base.
import geocoder
import pandas as pd
wc = pd.read_csv('world_cities.csv')
countries= pd.read_csv('EuroCitiesPopulation15_.csv').country
cities = pd.read_csv('EuroCitiesPopulation15_.csv').name
city_id=[]
for co,ci in zip(countries,cities):
city_id=city_id+[(wc.set_index('country').loc[co].set_index('name').loc[ci].geonameid)]
lat=[]
lng=[]
population=[]
country_code=[]
time_zone=[]
east=[]
south=[]
north=[]
west=[]
for c in city_id:
# DATA COLLECTION:
g = geocoder.geonames(c, method='details', key='jlma_ve84')
lat = lat+[(g.geojson['features'][0]['properties']['lat'])]
lng = lng+[(g.geojson['features'][0]['properties']['lng'])]
population = population+[(g.geojson['features'][0]['properties']['population'])]
country_code = country_code+[(g.geojson['features'][0]['properties']['country_code'])]
time_zone = time_zone+[3600*(g.geojson['features'][0]['properties']['raw']['timezone']['gmtOffset'])]
east = east+[(g.geojson['features'][0]['properties']['raw']['bbox']['east'])]
south = south+[(g.geojson['features'][0]['properties']['raw']['bbox']['south'])]
north = north+[(g.geojson['features'][0]['properties']['raw']['bbox']['north'])]
west = west+[(g.geojson['features'][0]['properties']['raw']['bbox']['west'])]
cities_dic = {'city_id': city_id,
'city': cities,
'country':countries,
'Code': country_code,
'Population': population,
'Time_Zone': time_zone,
'Latitud': lat,
'Logitud': lng,
'East': east,
'South': south,
'North': north,
'West': west}
cities_df = pd.DataFrame.from_dict(cities_dic)
cities_df.to_csv('cities.csv', index=False)
The following code was used to collect the airports data and generate the csv file that will be use for filling the airports table in the Data Base
import requests
import datetime
import pandas as pd
import numpy as np
import config as cfg
cities_df = pd.read_csv('cities.csv')
cities = cities_df['city']
airports = pd.DataFrame(columns=['city','city_id','lat','lon','icao','iata','name'])
for city in cities:
lat = float(cities_df.loc[cities_df['city']==city]['Latitud'])
lon = float(cities_df.loc[cities_df['city']==city]['Logitud'])
url = f"https://aerodatabox.p.rapidapi.com/airports/search/location/{lat}/{lon}/km/50/16"
querystring = {"withFlightInfoOnly":"0"}
headers = {
"X-RapidAPI-Key": cfg.RAPIDAPI_KEY,
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com"
}
response = requests.request("GET", url, headers=headers)#, params=querystring)
print('Status code',response.status_code)
airp_js = response.json()
for a in airp_js["items"]:
airports = airports.append({'city':city,
'city_id': cities_df[cities_df['city']==city].city_id.iloc[0],
'lat':a["location"]["lat"],
'lon':a["location"]["lon"],
'icao':a["icao"],
'iata':a["iata"],
'name':a["name"]
}
,ignore_index=True)
airports.to_csv('airports_new.csv', index=False)
Collecting Flights ans Weather data was located at the end the pipeline at the AWS Lambda automation data collection because it requires the Cities and Airports information from the Data Base.
Using the following SQL code, create the database and all the tables-schema.
CREATE DATABASE gans;
USE gans;
CREATE TABLE IF NOT EXISTS cities (
city_id INT,
city VARCHAR(200),
country VARCHAR(200),
country_code CHAR(2),
population INT,
time_zone INT,
latitude FLOAT,
longitude FLOAT,
PRIMARY KEY(city_id)
);
SELECT * FROM cities;
CREATE TABLE IF NOT EXISTS weathers (
weather_id INT auto_increment,
city_id INT,
time_utc INT,
local_time CHAR(20),
temperature FLOAT,
humidity FLOAT,
cloudiness_pc INT,
wind_speed FLOAT,
precipitation_prob FLOAT,
rain_volume FLOAT,
snow_volume FLOAT,
PRIMARY KEY(weather_id),
FOREIGN KEY(city_id) REFERENCES cities(city_id)
);
SELECT * FROM weathers;
CREATE TABLE IF NOT EXISTS airports (
city_id INT,
lat FLOAT,
lon FLOAT,
icao CHAR(4),
iata CHAR(3),
name VARCHAR(200),
PRIMARY KEY(icao),
FOREIGN KEY(city_id) REFERENCES cities(city_id)
);
SELECT * FROM airports;
CREATE TABLE IF NOT EXISTS flights (
flights_id INT auto_increment,
icao CHAR(4),
date CHAR(10),
hour_day CHAR(5),
num_of_arriv INT,
num_of_depart INT,
PRIMARY KEY(flights_id),
FOREIGN KEY(icao) REFERENCES airports(icao)
);
SELECT * FROM flights;
The picture shown above graphically represents these four tables and their relations. The primary keys are indicated by the yellow symbols. Foreign keys are indicated by the pink diamonds. Every foreign key is a primary key of another table.
Running the following python script will push the dataframes from csv to cities and airports in the AWS DB.
cities.csv into the cities-table and airports.csv into the airports-table inside the database.import pandas as pd
import requests
import sqlalchemy
import config as cfg
import pymysql
schema="gans"
host=cfg.DATABASE_HOST
user=cfg.DATABASE_USER
password=cfg.DATABASE_PASSWORD
port=cfg.DATABASE_PORT
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'
path='PATH'
cities_df = pd.read_csv(path+'cities.csv')
cities_df.to_sql('cities',
if_exists='append',
con=con,
index=False)
airports = pd.read_csv(path+'airports.csv')
airports.to_sql('airports',
if_exists='append',
con=con,
index=False)
NOTE: For this project, the collected weather was for 5 days prediction and the flights data is 1 day prediction. It was considered to inplement 2 independed Lambda funtions to separate the data update for weather and flights.
It will require the following python scripts:
from IPython.display import Image
Image(filename='lambda_code.png',width=800, height=400)
This code will collect and create a dataframe for the weather prediction in intervals of 3 hour per day for the following 5 days and push the data to the database weather table
DATABASE_HOST = 'xxxxxxxxxxxxxxxxxxxxxx'
DATABASE_USER = 'xxxx'
DATABASE_PASSWORD = 'xxxxxxxxx'
DATABASE_PORT = 'xxxx'
GEO_USERNAME = 'xxxxxx'
WEATHER_API_KEY = 'xxxxxx'
RAPIDAPI_KEY = 'xxxxxx'
import json
import requests
from datetime import datetime
import pandas as pd
import sqlalchemy
import weather
def lambda_handler(event, context):
weather.get_weather()
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
import requests
from datetime import datetime
import pandas as pd
import config as cfg
def get_weather():
schema="gans"
host=cfg.DATABASE_HOST
user=cfg.DATABASE_USER
password=cfg.DATABASE_PASSWORD
port=cfg.DATABASE_PORT
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'
cities_df = pd.read_sql('cities',con=con)
cities = cities_df['city']
API_key = cfg.WEATHER_API_KEY
weather = pd.DataFrame(columns=['city_id',
'time_utc',
'local_time',
'temperature',
'humidity',
'cloudiness_pc',
'wind_speed',
'precipitation_prob',
'rain_volume',
'snow_volume'])
for city in cities:
url = f"http://api.openweathermap.org/data/2.5/forecast?q={city}&appid={API_key}&units=metric"
response = requests.get(url)
js = response.json()
for x in js["list"]:
try:
rain_vol = x["rain"]["3h"]
except KeyError as e1:
#print('I got a KeyError - reason "%s"' % str(e1))
rain_vol = 0
try:
snow_vol = x["snow"]["3h"]
except KeyError as e2:
#print('I got a KeyError - reason "%s"' % str(e2))
snow_vol = 0
weather=weather.append({'city_id': cities_df[cities_df['city']==city].city_id.iloc[0],
'time_utc':x["dt"],
'local_time':datetime
.utcfromtimestamp(x["dt"]+
int(cities_df[cities_df['city']==city].time_zone))
.strftime('%Y-%m-%d %H:%M:%S'),
'temperature':x["main"]["temp"],
'humidity':x["main"]["humidity"],
'cloudiness_pc':x["clouds"]["all"],
'wind_speed':x["wind"]["speed"],
'precipitation_prob':x["pop"],
'rain_volume':rain_vol,#x["rain"]["3h"],
'snow_volume':snow_vol#x["snow"]["3h"]
}
,ignore_index=True)
weather.to_sql('weathers',
if_exists='append',
con=con,
index=False)
This code will collect and create a dataframe for the arrival and departure flights prediction for the next day and categorize the results per each hour of the day. Finally push the dataframe to the database flights table
import json
import requests
from datetime import datetime
import pandas as pd
import sqlalchemy
import weather
def lambda_handler(event, context):
flights.get_flights()
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
import requests
import datetime
import pandas as pd
import numpy as np
import config as cfg
def get_flights():
schema="gans"
host=cfg.DATABASE_HOST
user=cfg.DATABASE_USER
password=cfg.DATABASE_PASSWORD
port=cfg.DATABASE_PORT
con = f'mysql+pymysql://{user}:{password}@{host}:{port}/{schema}'
#airports= pd.read_sql('airports',con=con)
flights = pd.DataFrame(columns=['icao','date','hour_day','num_of_arriv','num_of_depart'])
hour_day= ['00-01','02-03','03-04','04-05','05-06','06-07','07-08','08-09','09-10','10-11','11-12',
'12-13','13-14','14-15','15-16','16-17','17-18','18-19','19-20','20-21','21-22','22-23','23-00']
air=['LPPR']
for a_icao in air:# airpotrs['icao']:
date = (datetime.date.today() + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
t1=['00:00','12:00']
t2=['11:59','23:59']
headers = {
"X-RapidAPI-Host": "aerodatabox.p.rapidapi.com",
"X-RapidAPI-Key": cfg.RAPIDAPI_KEY
}
flight_js = list()
for i in range(2):
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{a_icao}/{date}T{t1[i]}/{date}T{t2[i]}"
response = requests.request("GET", url, headers=headers)
print('Status code',response.status_code)
if response.status_code!=200:
print("ERROR to communcate with the flights API")
break
else:
temp_flight_js = response.json()
flight_js.append(temp_flight_js)
# CALCULATE THE ARRIVAL AND DEPARTURES CATEGORIES OF THE DAY:
def getTime(list, json, f_js, arrORdep):
from datetime import datetime
[date,time_]=(f_js[list][arrORdep][json]['movement']['scheduledTimeLocal']).split()
[time_,summerdelta]= time_.split('+')
time_=datetime.strptime(time_,'%H:%M').time()
return time_
def getTrafficPerHour(listOfFlights):
arrivalsPerHour = [0]*24
for i in range(len(listOfFlights)):
h_a = listOfFlights[i].hour
arrivalsPerHour[h_a] +=1
return arrivalsPerHour
a_times_series = []; d_times_series = []
for i_list in range(len(flight_js)):
for i_json_a,i_json_d in zip((range(len(flight_js[i_list]['arrivals']))),(range(len(flight_js[i_list]['departures'])))):
a_times_series.append(getTime(i_list, i_json_a,flight_js,'arrivals'))
d_times_series.append(getTime(i_list, i_json_d,flight_js,'departures'))
a_l= getTrafficPerHour(a_times_series)
d_l= getTrafficPerHour(d_times_series)
# CREATE THE DATAFRAME OF ARRIVAL & DEPARTURE FLIGHTS PER: AIRPORT,DAY,HOUR:
for a,d,c in zip(a_l,d_l,hour_day):
flights = flights.append({'icao':a_icao,
'date':date,
'hour_day':c,
'num_of_arriv':a,
'num_of_depart':d
}
,ignore_index=True)
# PUSH THE FLIGHTS DATA TO THE DATABASE:
flights.to_sql('flights',
if_exists='append',
con=con,
index=False)
The all Data Engineering Pipeline is now running in the Cloud:
Now that the weather & flights prediction is available in the database, updating in real time and accessible by everyone, it will be easy to make a good prediction of the right place and time of the day to locate the e-scooters and provide a valueable insights for Gans company.